package com.example.dobs.demo.flink.realtime.report.study.flink;

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import com.example.dobs.demo.flink.realtime.report.study.flink.bean.Event;

import javax.annotation.Nullable;

public class BoundedOutOfOrdernessGenerator2 implements AssignerWithPeriodicWatermarks<String> {


    private final long maxOutOfOrderness = 10000;

    private long currentMaxTimestamp;

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        long timestamp = currentMaxTimestamp - maxOutOfOrderness;
        return new Watermark(timestamp);
    }

    @Override
    public long extractTimestamp(String element, long previousElementTimestamp) {
        long timestamp = Long.parseLong(new Event().readByLine(element).timestamp);
        currentMaxTimestamp =  Math.max(timestamp, currentMaxTimestamp);
        return timestamp;

    }
}
